﻿using Autofac;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using Uiap.EventBus.Abstrations;
using Uiap.EventBus.Events;
using System;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Uiap.Infrastructure.Extensions;

namespace Uiap.EventBus.RabbitMQ
{
    public class EventBusRabbitMQ : IEventBus, IDisposable
    {
        #region fields

        private const string BROKER_NAME = "silverlining_event_bus";

        private readonly IRabbitMQPersistentConnection _connection;
        private readonly ILogger<EventBusRabbitMQ> _logger;
        private readonly IEventBusSubscriptionsManager _subsManager;
        private const int RETRY_COUNT = 5;
        private readonly ILifetimeScope _autoFac;

        private IModel _consumerChannel;
        private readonly string _queueName;

        #endregion

        #region ctors

        public EventBusRabbitMQ(IRabbitMQPersistentConnection connection,
            ILogger<EventBusRabbitMQ> logger,
            ILifetimeScope autoFac,
            IEventBusSubscriptionsManager subsManager)
        {
            _connection = connection;
            _subsManager = subsManager;
            _logger = logger;
            _autoFac = autoFac;
            _queueName = "event_bus_test_queue";
            _consumerChannel = CreateConsumerChannel();
            _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
        }

        #endregion

        #region methods

        /// <summary>
        /// 移除事件处理
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void SubsManager_OnEventRemoved(object sender, string e)
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            using (var channel = _connection.CreateModel())
            {
                channel.QueueUnbind(queue: _queueName,
                    exchange: BROKER_NAME,
                    routingKey: e);

                if (_subsManager.IsEmpty)
                {
                    _consumerChannel.Close();
                }
            }
        }

        /// <summary>
        /// 创建AMQP信道
        /// </summary>
        /// <returns></returns>
        private IModel CreateConsumerChannel()
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            _logger.LogInformation("开始创建RabbitMQ消费者信道...");

            var channel = _connection.CreateModel();

            //声明一个非持久化、非自动删除的交换器
            channel.ExchangeDeclare(exchange: BROKER_NAME, type: ExchangeType.Direct);

            //声明一个持久化、非排外、不自动删除的队列
            channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            channel.CallbackException += (sender, ea) =>
            {
                _logger.LogWarning($"重新创建RabbitMQ消费者信道", ea.Exception);

                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
                StartBasicConsume();
            };

            return channel;
        }

        /// <summary>
        /// 开始订阅队列消息
        /// </summary>
        private void StartBasicConsume()
        {
            _logger.LogInformation("开始订阅队列消息");

            if (_consumerChannel != null)
            {
                var consumer = new AsyncEventingBasicConsumer(_consumerChannel);

                //订阅成功接收事件
                consumer.Received += Consumer_Received;

                //开启消息订阅，不自动确认消息，通过手动确认方式
                _consumerChannel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
            }
            else
            {
                _logger.LogWarning("调用StartBasicConsume时_consumerChannel=null");
            }
        }

        /// <summary>
        /// 消息接收处理及确认
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="event"></param>
        /// <returns></returns>
        private async Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
        {
            var eventName = @event.RoutingKey;
            var message = Encoding.UTF8.GetString(@event.Body);

            try
            {
                if (message.ToLowerInvariant().Contains("throw-fake-exception"))
                {
                    throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
                }

                ///处理消息
                await ProcessEvent(eventName, message);

                //确认消息
                //TODO：考虑使用DLX（Dead Letters Exchange）
                _consumerChannel.BasicAck(@event.DeliveryTag, multiple: false);
            }
            catch (Exception ex)
            {
                _logger.LogError($"处理消息时异常：{message}", ex);
            }
        }

        /// <summary>
        /// 处理事件
        /// </summary>
        /// <param name="eventName">事件名称</param>
        /// <param name="message">事件消息</param>
        /// <returns></returns>
        private async Task ProcessEvent(string eventName, string message)
        {
            _logger.LogInformation($"开始处理事件：{eventName}");

            if (_subsManager.HasSubscriptionForEvent(eventName))
            {
                using var scope = _autoFac.BeginLifetimeScope("event_bus_test");
                var subscriptions = _subsManager.GetHandlerForEvent(eventName);
                foreach (var subscription in subscriptions)
                {
                    if (subscription.IsDynamic)
                    {
                        var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                        if (handler == null) continue;
                        dynamic eventData = JObject.Parse(message);
                        await handler.Handle(eventData);
                    }
                    else
                    {
                        var handler = scope.ResolveOptional(subscription.HandlerType);
                        if (handler == null) continue;
                        var eventType = _subsManager.GetEventTypeByName(eventName);
                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);

                        //实例
                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                    }
                }
            }
            else
            {
                _logger.LogInformation($"没有找到事件{eventName}的消费者！");
            }
        }

        /// <summary>
        /// 发布事件
        /// </summary>
        /// <param name="event"></param>
        public void Publish(IntegrationEvent @event)
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(RETRY_COUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning($"({time.TotalSeconds:n1}s后)无法发布事件，EventId:{@event.Id}，原因:{ex.Message}", ex);
                });

            var eventName = @event.GetType().Name;

            _logger.LogInformation($"开始创建RabbitMQ信道以发布事件：{@event.Id}({eventName})");
            using (var channel = _connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: BROKER_NAME, type: ExchangeType.Direct);

                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);

                policy.Execute(() =>
                {
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; //持久化消息

                    channel.BasicPublish(exchange: BROKER_NAME,     //交换器名称
                        routingKey: eventName,  //以事件名称作为路由key
                        mandatory: true,      //当mandatory标志位设置为true时，如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息，
                                              //那么broker会调用basic.return方法将消息返还给生产者; 当mandatory设置为false时，出现上述情况broker会直接将消息丢弃;
                                              //通俗的讲，mandatory标志告诉broker代理服务器至少将消息route到一个队列中，否则就将消息return给发送者
                        basicProperties: properties,
                        body: body);
                });
            }
        }

        /// <summary>
        /// 订阅集成事件
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventName<T>();
            DoInternalSubscription(eventName);

            _logger.LogInformation($"订阅事件{eventName}，处理器为{typeof(TH).GetGenericTypeName()}");

            _subsManager.AddSubscription<T, TH>();
            StartBasicConsume();
        }

        /// <summary>
        /// 订阅动态类型事件
        /// </summary>
        /// <typeparam name="TH"></typeparam>
        /// <param name="eventName"></param>
        public void SubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler
        {
            DoInternalSubscription(eventName);

            _logger.LogInformation($"订阅事件{eventName}，处理器为{typeof(TH).GetGenericTypeName()}");

            _subsManager.AddDynamicSubscription<TH>(eventName);
            StartBasicConsume();
        }

        /// <summary>
        /// 将事件绑定到队列
        /// </summary>
        /// <param name="eventName"></param>
        private void DoInternalSubscription(string eventName)
        {
            if (!_subsManager.HasSubscriptionForEvent(eventName))
            {
                if (!_connection.IsConnected)
                {
                    _connection.TryConnect();
                }

                using (var channel = _connection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                }
            }
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <typeparam name="TH"></typeparam>
        /// <param name="eventName"></param>
        public void UnsubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler
        {
            _logger.LogInformation($"取消订阅事件：{eventName}");

            _subsManager.RemoveDynamicSubscription<TH>(eventName);
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        public void Unsubscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventName<T>();

            _logger.LogInformation($"取消订阅事件：{eventName}");

            _subsManager.RemoveSubscription<T, TH>();
        }

        public void Dispose()
        {
            if (_consumerChannel != null)
            {
                _consumerChannel.Dispose();
            }

            _subsManager.Clear();
        }

        #endregion
    }
}
